/*
 * GPL HEADER START
 *
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 only,
 * as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * General Public License version 2 for more details (a copy is included
 * in the LICENSE file that accompanied this code).
 *
 * You should have received a copy of the GNU General Public License
 * version 2 along with this program; If not, see
 * http://www.gnu.org/licenses/gpl-2.0.html
 *
 * GPL HEADER END
 */
/*
 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
 * Use is subject to license terms.
 *
 * Copyright (c) 2012, 2014, Intel Corporation.
 */
/*
 * This file is part of Lustre, http://www.lustre.org/
 *
 * lnet/selftest/conctl.c
 *
 * Infrastructure of LST console
 *
 * Author: Liang Zhen <liangzhen@clusterfs.com>
 */

#include <libcfs/libcfs.h>
#include <lnet/lib-lnet.h>
#include "console.h"
#include "conrpc.h"

#define LST_NODE_STATE_COUNTER(nd, p)                   \
do {                                                    \
        if ((nd)->nd_state == LST_NODE_ACTIVE)          \
                (p)->nle_nactive ++;                    \
        else if ((nd)->nd_state == LST_NODE_BUSY)       \
                (p)->nle_nbusy ++;                      \
        else if ((nd)->nd_state == LST_NODE_DOWN)       \
                (p)->nle_ndown ++;                      \
        else                                            \
                (p)->nle_nunknown ++;                   \
        (p)->nle_nnode ++;                              \
} while (0)

struct lstcon_session console_session;

static void
lstcon_node_get(struct lstcon_node *nd)
{
        LASSERT (nd->nd_ref >= 1);

        nd->nd_ref++;
}

static int
lstcon_node_find(struct lnet_process_id id, struct lstcon_node **ndpp,
		 int create)
{
	struct lstcon_ndlink *ndl;
	unsigned int idx = LNET_NIDADDR(id.nid) % LST_GLOBAL_HASHSIZE;

	LASSERT(id.nid != LNET_NID_ANY);

	list_for_each_entry(ndl, &console_session.ses_ndl_hash[idx],
			    ndl_hlink) {
		if (ndl->ndl_node->nd_id.nid != id.nid ||
		    ndl->ndl_node->nd_id.pid != id.pid)
			continue;

		lstcon_node_get(ndl->ndl_node);
		*ndpp = ndl->ndl_node;
		return 0;
	}

        if (!create)
                return -ENOENT;

	LIBCFS_ALLOC(*ndpp, sizeof(**ndpp) + sizeof(*ndl));
	if (*ndpp == NULL)
		return -ENOMEM;

	ndl = (struct lstcon_ndlink *)(*ndpp + 1);

	ndl->ndl_node = *ndpp;

	ndl->ndl_node->nd_ref   = 1;
	ndl->ndl_node->nd_id    = id;
	ndl->ndl_node->nd_stamp = ktime_get();
	ndl->ndl_node->nd_state = LST_NODE_UNKNOWN;
	ndl->ndl_node->nd_timeout = 0;
	memset(&ndl->ndl_node->nd_ping, 0, sizeof(ndl->ndl_node->nd_ping));

	/* queued in global hash & list, no refcount is taken by
	 * global hash & list, if caller release his refcount,
	 * node will be released */
	list_add_tail(&ndl->ndl_hlink, &console_session.ses_ndl_hash[idx]);
	list_add_tail(&ndl->ndl_link, &console_session.ses_ndl_list);

	return 0;
}

static void
lstcon_node_put(struct lstcon_node *nd)
{
	struct lstcon_ndlink *ndl;

	LASSERT(nd->nd_ref > 0);

	if (--nd->nd_ref > 0)
		return;

	ndl = (struct lstcon_ndlink *)(nd + 1);

	LASSERT(!list_empty(&ndl->ndl_link));
	LASSERT(!list_empty(&ndl->ndl_hlink));

	/* remove from session */
	list_del(&ndl->ndl_link);
	list_del(&ndl->ndl_hlink);

	LIBCFS_FREE(nd, sizeof(*nd) + sizeof(*ndl));
}

static int
lstcon_ndlink_find(struct list_head *hash, struct lnet_process_id id,
		   struct lstcon_ndlink **ndlpp, int create)
{
	unsigned int idx = LNET_NIDADDR(id.nid) % LST_NODE_HASHSIZE;
	struct lstcon_ndlink *ndl;
	struct lstcon_node *nd;
	int rc;

	if (id.nid == LNET_NID_ANY)
		return -EINVAL;

	/* search in hash */
	list_for_each_entry(ndl, &hash[idx], ndl_hlink) {
		if (ndl->ndl_node->nd_id.nid != id.nid ||
		    ndl->ndl_node->nd_id.pid != id.pid)
			continue;

		*ndlpp = ndl;
		return 0;
	}

        if (create == 0)
                return -ENOENT;

        /* find or create in session hash */
        rc = lstcon_node_find(id, &nd, (create == 1) ? 1 : 0);
        if (rc != 0)
                return rc;

	LIBCFS_ALLOC(ndl, sizeof(*ndl));
        if (ndl == NULL) {
                lstcon_node_put(nd);
                return -ENOMEM;
        }

        *ndlpp = ndl;

	ndl->ndl_node = nd;
	INIT_LIST_HEAD(&ndl->ndl_link);
	list_add_tail(&ndl->ndl_hlink, &hash[idx]);

	return 0;
}

static void
lstcon_ndlink_release(struct lstcon_ndlink *ndl)
{
	LASSERT(list_empty(&ndl->ndl_link));
	LASSERT(!list_empty(&ndl->ndl_hlink));

	list_del(&ndl->ndl_hlink); /* delete from hash */
        lstcon_node_put(ndl->ndl_node);

        LIBCFS_FREE(ndl, sizeof(*ndl));
}

static int
lstcon_group_alloc(char *name, struct lstcon_group **grpp)
{
	struct lstcon_group *grp;
	int i;

	LIBCFS_ALLOC(grp, offsetof(struct lstcon_group,
                                   grp_ndl_hash[LST_NODE_HASHSIZE]));
        if (grp == NULL)
                return -ENOMEM;

        grp->grp_ref = 1;
	if (name != NULL) {
		if (strlen(name) > sizeof(grp->grp_name)-1) {
			LIBCFS_FREE(grp, offsetof(struct lstcon_group,
					  grp_ndl_hash[LST_NODE_HASHSIZE]));
			return -E2BIG;
		}
		strncpy(grp->grp_name, name, sizeof(grp->grp_name));
	}

	INIT_LIST_HEAD(&grp->grp_link);
	INIT_LIST_HEAD(&grp->grp_ndl_list);
	INIT_LIST_HEAD(&grp->grp_trans_list);

	for (i = 0; i < LST_NODE_HASHSIZE; i++)
		INIT_LIST_HEAD(&grp->grp_ndl_hash[i]);

	*grpp = grp;

	return 0;
}

static void
lstcon_group_addref(struct lstcon_group *grp)
{
	grp->grp_ref++;
}

static void lstcon_group_ndlink_release(struct lstcon_group *,
					struct lstcon_ndlink *);

static void
lstcon_group_drain(struct lstcon_group *grp, int keep)
{
	struct lstcon_ndlink *ndl;
	struct lstcon_ndlink *tmp;

	list_for_each_entry_safe(ndl, tmp, &grp->grp_ndl_list, ndl_link) {
		if ((ndl->ndl_node->nd_state & keep) == 0)
			lstcon_group_ndlink_release(grp, ndl);
	}
}

static void
lstcon_group_decref(struct lstcon_group *grp)
{
	int i;

	if (--grp->grp_ref > 0)
		return;

	if (!list_empty(&grp->grp_link))
		list_del(&grp->grp_link);

	lstcon_group_drain(grp, 0);

	for (i = 0; i < LST_NODE_HASHSIZE; i++)
		LASSERT(list_empty(&grp->grp_ndl_hash[i]));

	LIBCFS_FREE(grp, offsetof(struct lstcon_group,
				  grp_ndl_hash[LST_NODE_HASHSIZE]));
}

static int
lstcon_group_find(const char *name, struct lstcon_group **grpp)
{
	struct lstcon_group *grp;

	list_for_each_entry(grp, &console_session.ses_grp_list, grp_link) {
		if (strncmp(grp->grp_name, name, LST_NAME_SIZE) != 0)
			continue;

		lstcon_group_addref(grp);  /* +1 ref for caller */
		*grpp = grp;
		return 0;
	}

	return -ENOENT;
}

static int
lstcon_group_ndlink_find(struct lstcon_group *grp, struct lnet_process_id id,
			 struct lstcon_ndlink **ndlpp, int create)
{
	int rc;

	rc = lstcon_ndlink_find(&grp->grp_ndl_hash[0], id, ndlpp, create);
	if (rc != 0)
		return rc;

	if (!list_empty(&(*ndlpp)->ndl_link))
		return 0;

	list_add_tail(&(*ndlpp)->ndl_link, &grp->grp_ndl_list);
	grp->grp_nnode++;

	return 0;
}

static void
lstcon_group_ndlink_release(struct lstcon_group *grp, struct lstcon_ndlink *ndl)
{
	list_del_init(&ndl->ndl_link);
	lstcon_ndlink_release(ndl);
	grp->grp_nnode--;
}

static void
lstcon_group_ndlink_move(struct lstcon_group *old,
			 struct lstcon_group *new, struct lstcon_ndlink *ndl)
{
	unsigned int idx = LNET_NIDADDR(ndl->ndl_node->nd_id.nid) %
					LST_NODE_HASHSIZE;

	old->grp_nnode--;

	list_move_tail(&ndl->ndl_hlink, &new->grp_ndl_hash[idx]);
	list_move_tail(&ndl->ndl_link, &new->grp_ndl_list);
	new->grp_nnode++;
}

static void
lstcon_group_move(struct lstcon_group *old, struct lstcon_group *new)
{
	struct lstcon_ndlink *ndl;

	while (!list_empty(&old->grp_ndl_list)) {
		ndl = list_entry(old->grp_ndl_list.next,
				 struct lstcon_ndlink, ndl_link);
		lstcon_group_ndlink_move(old, new, ndl);
	}
}

static int
lstcon_sesrpc_condition(int transop, struct lstcon_node *nd, void *arg)
{
	struct lstcon_group *grp = arg;

        switch (transop) {
        case LST_TRANS_SESNEW:
                if (nd->nd_state == LST_NODE_ACTIVE)
                        return 0;
                break;

        case LST_TRANS_SESEND:
                if (nd->nd_state != LST_NODE_ACTIVE)
                        return 0;

                if (grp != NULL && nd->nd_ref > 1)
                        return 0;
                break;

        case LST_TRANS_SESQRY:
                break;

        default:
                LBUG();
        }

        return 1;
}

static int
lstcon_sesrpc_readent(int transop, struct srpc_msg *msg,
		      struct lstcon_rpc_ent __user *ent_up)
{
	struct srpc_debug_reply *rep;

        switch (transop) {
        case LST_TRANS_SESNEW:
        case LST_TRANS_SESEND:
                return 0;

        case LST_TRANS_SESQRY:
                rep = &msg->msg_body.dbg_reply;

		if (copy_to_user(&ent_up->rpe_priv[0],
                                     &rep->dbg_timeout, sizeof(int)) ||
		    copy_to_user(&ent_up->rpe_payload[0],
                                     &rep->dbg_name, LST_NAME_SIZE))
                        return -EFAULT;

                return 0;

        default:
                LBUG();
        }

        return 0;
}

static int
lstcon_group_nodes_add(struct lstcon_group *grp,
		       int count, struct lnet_process_id __user *ids_up,
		       unsigned int *featp,
		       struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
	struct lstcon_ndlink *ndl;
	struct lstcon_group *tmp;
	struct lnet_process_id id;
	int i;
	int rc;

        rc = lstcon_group_alloc(NULL, &tmp);
        if (rc != 0) {
                CERROR("Out of memory\n");
                return -ENOMEM;
        }

        for (i = 0 ; i < count; i++) {
		if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
                        rc = -EFAULT;
                        break;
                }

                /* skip if it's in this group already */
                rc = lstcon_group_ndlink_find(grp, id, &ndl, 0);
                if (rc == 0)
                        continue;

                /* add to tmp group */
                rc = lstcon_group_ndlink_find(tmp, id, &ndl, 1);
                if (rc != 0) {
                        CERROR("Can't create ndlink, out of memory\n");
                        break;
                }
        }

        if (rc != 0) {
		lstcon_group_decref(tmp);
                return rc;
        }

        rc = lstcon_rpc_trans_ndlist(&tmp->grp_ndl_list,
                                     &tmp->grp_trans_list, LST_TRANS_SESNEW,
                                     tmp, lstcon_sesrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
		lstcon_group_decref(tmp);
                return rc;
        }

        /* post all RPCs */
        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                          lstcon_sesrpc_readent);
	*featp = trans->tas_features;

        /* destroy all RPGs */
        lstcon_rpc_trans_destroy(trans);

        lstcon_group_move(tmp, grp);
	lstcon_group_decref(tmp);

        return rc;
}

static int
lstcon_group_nodes_remove(struct lstcon_group *grp,
			  int count, struct lnet_process_id __user *ids_up,
			  struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
	struct lstcon_ndlink *ndl;
	struct lstcon_group *tmp;
	struct lnet_process_id id;
	int rc;
	int i;

        /* End session and remove node from the group */

        rc = lstcon_group_alloc(NULL, &tmp);
        if (rc != 0) {
                CERROR("Out of memory\n");
                return -ENOMEM;
        }

        for (i = 0; i < count; i++) {
		if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
                        rc = -EFAULT;
                        goto error;
                }

                /* move node to tmp group */
                if (lstcon_group_ndlink_find(grp, id, &ndl, 0) == 0)
                        lstcon_group_ndlink_move(grp, tmp, ndl);
        }

        rc = lstcon_rpc_trans_ndlist(&tmp->grp_ndl_list,
                                     &tmp->grp_trans_list, LST_TRANS_SESEND,
                                     tmp, lstcon_sesrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                goto error;
        }

        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        rc = lstcon_rpc_trans_interpreter(trans, result_up, NULL);

        lstcon_rpc_trans_destroy(trans);
        /* release nodes anyway, because we can't rollback status */
	lstcon_group_decref(tmp);

        return rc;
error:
        lstcon_group_move(tmp, grp);
	lstcon_group_decref(tmp);

        return rc;
}

int
lstcon_group_add(char *name)
{
	struct lstcon_group *grp;
	int rc;

        rc = (lstcon_group_find(name, &grp) == 0)? -EEXIST: 0;
        if (rc != 0) {
                /* find a group with same name */
		lstcon_group_decref(grp);
                return rc;
        }

        rc = lstcon_group_alloc(name, &grp);
        if (rc != 0) {
                CERROR("Can't allocate descriptor for group %s\n", name);
                return -ENOMEM;
        }

	list_add_tail(&grp->grp_link, &console_session.ses_grp_list);

	return rc;
}

int
lstcon_nodes_add(char *name, int count, struct lnet_process_id __user *ids_up,
		 unsigned *featp, struct list_head __user *result_up)
{
	struct lstcon_group         *grp;
        int                     rc;

        LASSERT (count > 0);
        LASSERT (ids_up != NULL);

        rc = lstcon_group_find(name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group %s\n", name);
                return rc;
        }

        if (grp->grp_ref > 2) {
                /* referred by other threads or test */
                CDEBUG(D_NET, "Group %s is busy\n", name);
		lstcon_group_decref(grp);

                return -EBUSY;
        }

	rc = lstcon_group_nodes_add(grp, count, ids_up, featp, result_up);

	lstcon_group_decref(grp);

        return rc;
}

int
lstcon_group_del(char *name)
{
	struct lstcon_rpc_trans *trans;
	struct lstcon_group *grp;
	int rc;

        rc = lstcon_group_find(name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group: %s\n", name);
                return rc;
        }

        if (grp->grp_ref > 2) {
                /* referred by others threads or test */
                CDEBUG(D_NET, "Group %s is busy\n", name);
		lstcon_group_decref(grp);
                return -EBUSY;
        }

        rc = lstcon_rpc_trans_ndlist(&grp->grp_ndl_list,
                                     &grp->grp_trans_list, LST_TRANS_SESEND,
                                     grp, lstcon_sesrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
		lstcon_group_decref(grp);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        lstcon_rpc_trans_destroy(trans);

	lstcon_group_decref(grp);
        /* -ref for session, it's destroyed,
         * status can't be rolled back, destroy group anway */
	lstcon_group_decref(grp);

        return rc;
}

int
lstcon_group_clean(char *name, int args)
{
	struct lstcon_group *grp = NULL;
	int rc;

        rc = lstcon_group_find(name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group %s\n", name);
                return rc;
        }

        if (grp->grp_ref > 2) {
                /* referred by test */
                CDEBUG(D_NET, "Group %s is busy\n", name);
		lstcon_group_decref(grp);
                return -EBUSY;
        }

        args = (LST_NODE_ACTIVE | LST_NODE_BUSY |
                LST_NODE_DOWN | LST_NODE_UNKNOWN) & ~args;

        lstcon_group_drain(grp, args);

	lstcon_group_decref(grp);
	/* release empty group */
	if (list_empty(&grp->grp_ndl_list))
		lstcon_group_decref(grp);

	return 0;
}

int
lstcon_nodes_remove(char *name, int count,
		    struct lnet_process_id __user *ids_up,
		    struct list_head __user *result_up)
{
	struct lstcon_group *grp = NULL;
	int rc;

        rc = lstcon_group_find(name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group: %s\n", name);
                return rc;
        }

        if (grp->grp_ref > 2) {
                /* referred by test */
                CDEBUG(D_NET, "Group %s is busy\n", name);
		lstcon_group_decref(grp);
                return -EBUSY;
        }

        rc = lstcon_group_nodes_remove(grp, count, ids_up, result_up);

	lstcon_group_decref(grp);
	/* release empty group */
	if (list_empty(&grp->grp_ndl_list))
		lstcon_group_decref(grp);

	return rc;
}

int
lstcon_group_refresh(char *name, struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
	struct lstcon_group *grp;
	int rc;

        rc = lstcon_group_find(name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group: %s\n", name);
                return rc;
        }

        if (grp->grp_ref > 2) {
                /* referred by test */
                CDEBUG(D_NET, "Group %s is busy\n", name);
		lstcon_group_decref(grp);
                return -EBUSY;
        }

        /* re-invite all inactive nodes int the group */
        rc = lstcon_rpc_trans_ndlist(&grp->grp_ndl_list,
                                     &grp->grp_trans_list, LST_TRANS_SESNEW,
                                     grp, lstcon_sesrpc_condition, &trans);
        if (rc != 0) {
                /* local error, return */
                CDEBUG(D_NET, "Can't create transaction: %d\n", rc);
		lstcon_group_decref(grp);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        rc = lstcon_rpc_trans_interpreter(trans, result_up, NULL);

        lstcon_rpc_trans_destroy(trans);
        /* -ref for me */
	lstcon_group_decref(grp);

        return rc;
}

int
lstcon_group_list(int index, int len, char __user *name_up)
{
	struct lstcon_group *grp;

	LASSERT(index >= 0);
	LASSERT(name_up != NULL);

	list_for_each_entry(grp, &console_session.ses_grp_list, grp_link) {
		if (index-- == 0) {
			return copy_to_user(name_up, grp->grp_name, len) ?
					    -EFAULT : 0;
		}
	}

	return -ENOENT;
}

static int
lstcon_nodes_getent(struct list_head *head, int *index_p,
		    int *count_p, struct lstcon_node_ent __user *dents_up)
{
	struct lstcon_ndlink *ndl;
	struct lstcon_node *nd;
	int count = 0;
	int index = 0;

	LASSERT(index_p != NULL && count_p != NULL);
	LASSERT(dents_up != NULL);
	LASSERT(*index_p >= 0);
	LASSERT(*count_p > 0);

	list_for_each_entry(ndl, head, ndl_link) {
		if (index++ < *index_p)
			continue;

                if (count >= *count_p)
                        break;

                nd = ndl->ndl_node;
		if (copy_to_user(&dents_up[count].nde_id,
                                     &nd->nd_id, sizeof(nd->nd_id)) ||
		    copy_to_user(&dents_up[count].nde_state,
                                     &nd->nd_state, sizeof(nd->nd_state)))
                        return -EFAULT;

                count ++;
        }

        if (index <= *index_p)
                return -ENOENT;

        *count_p = count;
        *index_p = index;

        return 0;
}

int
lstcon_group_info(char *name, struct lstcon_ndlist_ent __user *gents_p,
		  int *index_p, int *count_p,
		  struct lstcon_node_ent __user *dents_up)
{
	struct lstcon_ndlist_ent *gentp;
	struct lstcon_group *grp;
	struct lstcon_ndlink *ndl;
	int rc;

        rc = lstcon_group_find(name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group %s\n", name);
                return rc;
        }

	if (dents_up != NULL) {
                /* verbose query */
                rc = lstcon_nodes_getent(&grp->grp_ndl_list,
                                         index_p, count_p, dents_up);
		lstcon_group_decref(grp);

                return rc;
        }

        /* non-verbose query */
	CFS_ALLOC_PTR(gentp);
        if (gentp == NULL) {
                CERROR("Can't allocate ndlist_ent\n");
		lstcon_group_decref(grp);

                return -ENOMEM;
        }

	list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link)
                LST_NODE_STATE_COUNTER(ndl->ndl_node, gentp);

	rc = copy_to_user(gents_p, gentp,
			  sizeof(struct lstcon_ndlist_ent)) ? -EFAULT : 0;

	CFS_FREE_PTR(gentp);

	lstcon_group_decref(grp);

	return 0;
}

static int
lstcon_batch_find(const char *name, struct lstcon_batch **batpp)
{
	struct lstcon_batch *bat;

	list_for_each_entry(bat, &console_session.ses_bat_list, bat_link) {
		if (strncmp(bat->bat_name, name, LST_NAME_SIZE) == 0) {
			*batpp = bat;
			return 0;
		}
	}

	return -ENOENT;
}

int
lstcon_batch_add(char *name)
{
	struct lstcon_batch *bat;
	int i;
	int rc;

        rc = (lstcon_batch_find(name, &bat) == 0)? -EEXIST: 0;
        if (rc != 0) {
                CDEBUG(D_NET, "Batch %s already exists\n", name);
                return rc;
        }

	LIBCFS_ALLOC(bat, sizeof(*bat));
        if (bat == NULL) {
                CERROR("Can't allocate descriptor for batch %s\n", name);
                return -ENOMEM;
        }

	CFS_ALLOC_PTR_ARRAY(bat->bat_cli_hash, LST_NODE_HASHSIZE);
	if (bat->bat_cli_hash == NULL) {
		CERROR("Can't allocate hash for batch %s\n", name);
		LIBCFS_FREE(bat, sizeof(*bat));

		return -ENOMEM;
	}

	CFS_ALLOC_PTR_ARRAY(bat->bat_srv_hash, LST_NODE_HASHSIZE);
	if (bat->bat_srv_hash == NULL) {
		CERROR("Can't allocate hash for batch %s\n", name);
		LIBCFS_FREE(bat->bat_cli_hash, LST_NODE_HASHSIZE);
		LIBCFS_FREE(bat, sizeof(*bat));

		return -ENOMEM;
	}

	if (strlen(name) > sizeof(bat->bat_name)-1) {
		LIBCFS_FREE(bat->bat_srv_hash, LST_NODE_HASHSIZE);
		LIBCFS_FREE(bat->bat_cli_hash, LST_NODE_HASHSIZE);
		LIBCFS_FREE(bat, sizeof(*bat));
		return -E2BIG;
	}
	strncpy(bat->bat_name, name, sizeof(bat->bat_name));
        bat->bat_hdr.tsb_index = 0;
        bat->bat_hdr.tsb_id.bat_id = ++console_session.ses_id_cookie;

        bat->bat_ntest = 0;
        bat->bat_state = LST_BATCH_IDLE;

	INIT_LIST_HEAD(&bat->bat_cli_list);
	INIT_LIST_HEAD(&bat->bat_srv_list);
	INIT_LIST_HEAD(&bat->bat_test_list);
	INIT_LIST_HEAD(&bat->bat_trans_list);

        for (i = 0; i < LST_NODE_HASHSIZE; i++) {
		INIT_LIST_HEAD(&bat->bat_cli_hash[i]);
		INIT_LIST_HEAD(&bat->bat_srv_hash[i]);
        }

	list_add_tail(&bat->bat_link, &console_session.ses_bat_list);

        return rc;
}

int
lstcon_batch_list(int index, int len, char __user *name_up)
{
	struct lstcon_batch *bat;

	LASSERT(name_up != NULL);
	LASSERT(index >= 0);

	list_for_each_entry(bat, &console_session.ses_bat_list, bat_link) {
		if (index-- == 0) {
			return copy_to_user(name_up, bat->bat_name, len) ?
					    -EFAULT : 0;
		}
	}

	return -ENOENT;
}

int
lstcon_batch_info(char *name, struct lstcon_test_batch_ent __user *ent_up,
		  int server, int testidx, int *index_p, int *ndent_p,
		  struct lstcon_node_ent __user *dents_up)
{
	struct lstcon_test_batch_ent *entp;
	struct list_head *clilst;
	struct list_head *srvlst;
	struct lstcon_test *test = NULL;
	struct lstcon_batch *bat;
	struct lstcon_ndlink *ndl;
	int rc;

        rc = lstcon_batch_find(name, &bat);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find batch %s\n", name);
                return -ENOENT;
        }

	if (testidx > 0) {
		/* query test, test index start from 1 */
		list_for_each_entry(test, &bat->bat_test_list, tes_link) {
			if (testidx-- == 1)
				break;
		}

                if (testidx > 0) {
                        CDEBUG(D_NET, "Can't find specified test in batch\n");
                        return -ENOENT;
                }
        }

        clilst = (test == NULL) ? &bat->bat_cli_list :
                                  &test->tes_src_grp->grp_ndl_list;
        srvlst = (test == NULL) ? &bat->bat_srv_list :
                                  &test->tes_dst_grp->grp_ndl_list;

        if (dents_up != NULL) {
                rc = lstcon_nodes_getent((server ? srvlst: clilst),
                                         index_p, ndent_p, dents_up);
                return rc;
        }

        /* non-verbose query */
	CFS_ALLOC_PTR(entp);
        if (entp == NULL)
                return -ENOMEM;

        if (test == NULL) {
                entp->u.tbe_batch.bae_ntest = bat->bat_ntest;
                entp->u.tbe_batch.bae_state = bat->bat_state;

        } else {

                entp->u.tbe_test.tse_type   = test->tes_type;
                entp->u.tbe_test.tse_loop   = test->tes_loop;
                entp->u.tbe_test.tse_concur = test->tes_concur;
        }

	list_for_each_entry(ndl, clilst, ndl_link)
		LST_NODE_STATE_COUNTER(ndl->ndl_node, &entp->tbe_cli_nle);

	list_for_each_entry(ndl, srvlst, ndl_link)
		LST_NODE_STATE_COUNTER(ndl->ndl_node, &entp->tbe_srv_nle);

	rc = copy_to_user(ent_up, entp,
			  sizeof(struct lstcon_test_batch_ent)) ? -EFAULT : 0;

	CFS_FREE_PTR(entp)

	return rc;
}

static int
lstcon_batrpc_condition(int transop, struct lstcon_node *nd, void *arg)
{
        switch (transop) {
        case LST_TRANS_TSBRUN:
                if (nd->nd_state != LST_NODE_ACTIVE)
                        return -ENETDOWN;
                break;

        case LST_TRANS_TSBSTOP:
                if (nd->nd_state != LST_NODE_ACTIVE)
                        return 0;
                break;

        case LST_TRANS_TSBCLIQRY:
        case LST_TRANS_TSBSRVQRY:
                break;
        }

        return 1;
}

static int
lstcon_batch_op(struct lstcon_batch *bat, int transop,
		struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
        int                 rc;

        rc = lstcon_rpc_trans_ndlist(&bat->bat_cli_list,
                                     &bat->bat_trans_list, transop,
                                     bat, lstcon_batrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        rc = lstcon_rpc_trans_interpreter(trans, result_up, NULL);

        lstcon_rpc_trans_destroy(trans);

        return rc;
}

int
lstcon_batch_run(char *name, int timeout, struct list_head __user *result_up)
{
	struct lstcon_batch *bat;
	int rc;

        if (lstcon_batch_find(name, &bat) != 0) {
                CDEBUG(D_NET, "Can't find batch %s\n", name);
                return -ENOENT;
        }

        bat->bat_arg = timeout;

        rc = lstcon_batch_op(bat, LST_TRANS_TSBRUN, result_up);

        /* mark batch as running if it's started in any node */
        if (lstcon_tsbop_stat_success(lstcon_trans_stat(), 0) != 0)
                bat->bat_state = LST_BATCH_RUNNING;

        return rc;
}

int
lstcon_batch_stop(char *name, int force, struct list_head __user *result_up)
{
	struct lstcon_batch *bat;
	int rc;

        if (lstcon_batch_find(name, &bat) != 0) {
                CDEBUG(D_NET, "Can't find batch %s\n", name);
                return -ENOENT;
        }

        bat->bat_arg = force;

        rc = lstcon_batch_op(bat, LST_TRANS_TSBSTOP, result_up);

        /* mark batch as stopped if all RPCs finished */
        if (lstcon_tsbop_stat_failure(lstcon_trans_stat(), 0) == 0)
                bat->bat_state = LST_BATCH_IDLE;

        return rc;
}

static void
lstcon_batch_destroy(struct lstcon_batch *bat)
{
	struct lstcon_ndlink *ndl;
	struct lstcon_test *test;
	int i;

	list_del(&bat->bat_link);

	while (!list_empty(&bat->bat_test_list)) {
		test = list_entry(bat->bat_test_list.next,
				  struct lstcon_test, tes_link);
		LASSERT(list_empty(&test->tes_trans_list));

		list_del(&test->tes_link);

		lstcon_group_decref(test->tes_src_grp);
		lstcon_group_decref(test->tes_dst_grp);

		LIBCFS_FREE(test, offsetof(struct lstcon_test,
					   tes_param[test->tes_paramlen]));
	}

	LASSERT(list_empty(&bat->bat_trans_list));

	while (!list_empty(&bat->bat_cli_list)) {
		ndl = list_entry(bat->bat_cli_list.next,
				 struct lstcon_ndlink, ndl_link);
		list_del_init(&ndl->ndl_link);

		lstcon_ndlink_release(ndl);
	}

	while (!list_empty(&bat->bat_srv_list)) {
		ndl = list_entry(bat->bat_srv_list.next,
				 struct lstcon_ndlink, ndl_link);
		list_del_init(&ndl->ndl_link);

		lstcon_ndlink_release(ndl);
	}

	for (i = 0; i < LST_NODE_HASHSIZE; i++) {
		LASSERT(list_empty(&bat->bat_cli_hash[i]));
		LASSERT(list_empty(&bat->bat_srv_hash[i]));
	}

	LIBCFS_FREE(bat->bat_cli_hash,
		    sizeof(struct list_head) * LST_NODE_HASHSIZE);
	LIBCFS_FREE(bat->bat_srv_hash,
		    sizeof(struct list_head) * LST_NODE_HASHSIZE);
	LIBCFS_FREE(bat, sizeof(*bat));
}

static int
lstcon_testrpc_condition(int transop, struct lstcon_node *nd, void *arg)
{
	struct lstcon_test *test = arg;
	struct lstcon_batch *batch;
	struct lstcon_ndlink *ndl;
	struct list_head *hash;
	struct list_head *head;

	LASSERT(test != NULL);

	batch = test->tes_batch;
	LASSERT(batch != NULL);

        if (test->tes_oneside &&
            transop == LST_TRANS_TSBSRVADD)
                return 0;

        if (nd->nd_state != LST_NODE_ACTIVE)
                return -ENETDOWN;

        if (transop == LST_TRANS_TSBCLIADD) {
                hash = batch->bat_cli_hash;
                head = &batch->bat_cli_list;

        } else {
                LASSERT (transop == LST_TRANS_TSBSRVADD);

                hash = batch->bat_srv_hash;
                head = &batch->bat_srv_list;
        }

        LASSERT (nd->nd_id.nid != LNET_NID_ANY);

        if (lstcon_ndlink_find(hash, nd->nd_id, &ndl, 1) != 0)
                return -ENOMEM;

	if (list_empty(&ndl->ndl_link))
		list_add_tail(&ndl->ndl_link, head);

	return 1;
}

static int
lstcon_test_nodes_add(struct lstcon_test *test,
		      struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
	struct lstcon_group *grp;
	int transop;
	int rc;

        LASSERT (test->tes_src_grp != NULL);
        LASSERT (test->tes_dst_grp != NULL);

        transop = LST_TRANS_TSBSRVADD;
        grp  = test->tes_dst_grp;
again:
        rc = lstcon_rpc_trans_ndlist(&grp->grp_ndl_list,
                                     &test->tes_trans_list, transop,
                                     test, lstcon_testrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        if (lstcon_trans_stat()->trs_rpc_errno != 0 ||
            lstcon_trans_stat()->trs_fwk_errno != 0) {
                lstcon_rpc_trans_interpreter(trans, result_up, NULL);

                lstcon_rpc_trans_destroy(trans);
                /* return if any error */
                CDEBUG(D_NET, "Failed to add test %s, "
                              "RPC error %d, framework error %d\n",
                       transop == LST_TRANS_TSBCLIADD ? "client" : "server",
                       lstcon_trans_stat()->trs_rpc_errno,
                       lstcon_trans_stat()->trs_fwk_errno);

                return rc;
        }

        lstcon_rpc_trans_destroy(trans);

        if (transop == LST_TRANS_TSBCLIADD)
                return rc;

        transop = LST_TRANS_TSBCLIADD;
        grp = test->tes_src_grp;
        test->tes_cliidx = 0;

        /* requests to test clients */
        goto again;
}

static int
lstcon_verify_batch(const char *name, struct lstcon_batch **batch)
{
	int rc;

	rc = lstcon_batch_find(name, batch);
	if (rc != 0) {
		CDEBUG(D_NET, "Can't find batch %s\n", name);
		return rc;
	}

	if ((*batch)->bat_state != LST_BATCH_IDLE) {
		CDEBUG(D_NET, "Can't change running batch %s\n", name);
		return -EINVAL;
	}

	return 0;
}

static int
lstcon_verify_group(const char *name, struct lstcon_group **grp)
{
	int rc;
	struct lstcon_ndlink *ndl;

	rc = lstcon_group_find(name, grp);
	if (rc != 0) {
		CDEBUG(D_NET, "can't find group %s\n", name);
		return rc;
	}

	list_for_each_entry(ndl, &(*grp)->grp_ndl_list, ndl_link) {
		if (ndl->ndl_node->nd_state == LST_NODE_ACTIVE) {
			return 0;
		}
	}

	CDEBUG(D_NET, "Group %s has no ACTIVE nodes\n", name);

	return -EINVAL;
}

int
lstcon_test_add(char *batch_name, int type, int loop,
		int concur, int dist, int span,
		char *src_name, char *dst_name,
		void *param, int paramlen, int *retp,
		struct list_head __user *result_up)
{
	struct lstcon_test *test = NULL;
	int rc;
	struct lstcon_group *src_grp = NULL;
	struct lstcon_group *dst_grp = NULL;
	struct lstcon_batch *batch = NULL;

	/*
	 * verify that a batch of the given name exists, and the groups
	 * that will be part of the batch exist and have at least one
	 * active node
	 */
	rc = lstcon_verify_batch(batch_name, &batch);
	if (rc != 0)
		goto out;

	rc = lstcon_verify_group(src_name, &src_grp);
	if (rc != 0)
		goto out;

	rc = lstcon_verify_group(dst_name, &dst_grp);
	if (rc != 0)
		goto out;

	if (dst_grp->grp_userland)
		*retp = 1;

	LIBCFS_ALLOC(test, offsetof(struct lstcon_test, tes_param[paramlen]));
	if (!test) {
		CERROR("Can't allocate test descriptor\n");
		rc = -ENOMEM;

		goto out;
	}

	test->tes_hdr.tsb_id	= batch->bat_hdr.tsb_id;
	test->tes_batch		= batch;
	test->tes_type		= type;
	test->tes_oneside	= 0; /* TODO */
	test->tes_loop		= loop;
	test->tes_concur	= concur;
	test->tes_stop_onerr	= 1; /* TODO */
	test->tes_span		= span;
	test->tes_dist		= dist;
	test->tes_cliidx	= 0; /* just used for creating RPC */
	test->tes_src_grp	= src_grp;
	test->tes_dst_grp	= dst_grp;
	INIT_LIST_HEAD(&test->tes_trans_list);

	if (param != NULL) {
		test->tes_paramlen = paramlen;
		memcpy(&test->tes_param[0], param, paramlen);
	}

	rc = lstcon_test_nodes_add(test, result_up);

	if (rc != 0)
		goto out;

	if (lstcon_trans_stat()->trs_rpc_errno != 0 ||
	    lstcon_trans_stat()->trs_fwk_errno != 0)
		CDEBUG(D_NET, "Failed to add test %d to batch %s\n", type,
		       batch_name);

	/* add to test list anyway, so user can check what's going on */
	list_add_tail(&test->tes_link, &batch->bat_test_list);

	batch->bat_ntest++;
	test->tes_hdr.tsb_index = batch->bat_ntest;

	/*  hold groups so nobody can change them */
	return rc;
out:
	if (test != NULL)
		LIBCFS_FREE(test, offsetof(struct lstcon_test,
					   tes_param[paramlen]));

	if (dst_grp != NULL)
		lstcon_group_decref(dst_grp);

	if (src_grp != NULL)
		lstcon_group_decref(src_grp);

	return rc;
}

static int
lstcon_test_find(struct lstcon_batch *batch, int idx,
		 struct lstcon_test **testpp)
{
	struct lstcon_test *test;

	list_for_each_entry(test, &batch->bat_test_list, tes_link) {
		if (idx == test->tes_hdr.tsb_index) {
			*testpp = test;
			return 0;
		}
	}

	return -ENOENT;
}

static int
lstcon_tsbrpc_readent(int transop, struct srpc_msg *msg,
		      struct lstcon_rpc_ent __user *ent_up)
{
	struct srpc_batch_reply *rep = &msg->msg_body.bat_reply;

        LASSERT (transop == LST_TRANS_TSBCLIQRY ||
                 transop == LST_TRANS_TSBSRVQRY);

        /* positive errno, framework error code */
	if (copy_to_user(&ent_up->rpe_priv[0],
                             &rep->bar_active, sizeof(rep->bar_active)))
                return -EFAULT;

        return 0;
}

int
lstcon_test_batch_query(char *name, int testidx, int client,
			int timeout, struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
	struct list_head *translist;
	struct list_head *ndlist;
	struct lstcon_tsb_hdr *hdr;
	struct lstcon_batch *batch;
	struct lstcon_test *test = NULL;
	int transop;
	int rc;

        rc = lstcon_batch_find(name, &batch);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find batch: %s\n", name);
                return rc;
        }

        if (testidx == 0) {
                translist = &batch->bat_trans_list;
                ndlist    = &batch->bat_cli_list;
                hdr       = &batch->bat_hdr;

        } else {
                /* query specified test only */
                rc = lstcon_test_find(batch, testidx, &test);
                if (rc != 0) {
                        CDEBUG(D_NET, "Can't find test: %d\n", testidx);
                        return rc;
                }

                translist = &test->tes_trans_list;
                ndlist    = &test->tes_src_grp->grp_ndl_list;
                hdr       = &test->tes_hdr;
	}

        transop = client ? LST_TRANS_TSBCLIQRY : LST_TRANS_TSBSRVQRY;

        rc = lstcon_rpc_trans_ndlist(ndlist, translist, transop, hdr,
                                     lstcon_batrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, timeout);

        if (testidx == 0 && /* query a batch, not a test */
            lstcon_rpc_stat_failure(lstcon_trans_stat(), 0) == 0 &&
            lstcon_tsbqry_stat_run(lstcon_trans_stat(), 0) == 0) {
                /* all RPCs finished, and no active test */
                batch->bat_state = LST_BATCH_IDLE;
        }

        rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                          lstcon_tsbrpc_readent);
        lstcon_rpc_trans_destroy(trans);

        return rc;
}

static int
lstcon_statrpc_readent(int transop, struct srpc_msg *msg,
		       struct lstcon_rpc_ent __user *ent_up)
{
	struct srpc_stat_reply *rep = &msg->msg_body.stat_reply;
	struct sfw_counters __user *sfwk_stat;
	struct srpc_counters __user *srpc_stat;
	struct lnet_counters_common __user *lnet_stat;

        if (rep->str_status != 0)
                return 0;

	sfwk_stat = (struct sfw_counters __user *)&ent_up->rpe_payload[0];
	srpc_stat = (struct srpc_counters __user *)
		((char __user *)sfwk_stat + sizeof(*sfwk_stat));
	lnet_stat = (struct lnet_counters_common __user *)
		((char __user *)srpc_stat + sizeof(*srpc_stat));

	if (copy_to_user(sfwk_stat, &rep->str_fw, sizeof(*sfwk_stat)) ||
	    copy_to_user(srpc_stat, &rep->str_rpc, sizeof(*srpc_stat)) ||
	    copy_to_user(lnet_stat, &rep->str_lnet, sizeof(*lnet_stat)))
                return -EFAULT;

        return 0;
}

static int
lstcon_ndlist_stat(struct list_head *ndlist,
		   int timeout, struct list_head __user *result_up)
{
	LIST_HEAD(head);
	struct lstcon_rpc_trans *trans;
	int rc;

        rc = lstcon_rpc_trans_ndlist(ndlist, &head,
                                     LST_TRANS_STATQRY, NULL, NULL, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, LST_VALIDATE_TIMEOUT(timeout));

        rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                          lstcon_statrpc_readent);
        lstcon_rpc_trans_destroy(trans);

        return rc;
}

int
lstcon_group_stat(char *grp_name, int timeout,
		  struct list_head __user *result_up)
{
	struct lstcon_group *grp;
	int rc;

        rc = lstcon_group_find(grp_name, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Can't find group %s\n", grp_name);
                return rc;
        }

        rc = lstcon_ndlist_stat(&grp->grp_ndl_list, timeout, result_up);

	lstcon_group_decref(grp);

        return rc;
}

int
lstcon_nodes_stat(int count, struct lnet_process_id __user *ids_up,
		  int timeout, struct list_head __user *result_up)
{
	struct lstcon_ndlink *ndl;
	struct lstcon_group *tmp;
	struct lnet_process_id id;
	int i;
	int rc;

        rc = lstcon_group_alloc(NULL, &tmp);
        if (rc != 0) {
                CERROR("Out of memory\n");
                return -ENOMEM;
        }

        for (i = 0 ; i < count; i++) {
		if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
                        rc = -EFAULT;
                        break;
                }

                /* add to tmp group */
                rc = lstcon_group_ndlink_find(tmp, id, &ndl, 2);
                if (rc != 0) {
                        CDEBUG((rc == -ENOMEM) ? D_ERROR : D_NET,
                               "Failed to find or create %s: %d\n",
                               libcfs_id2str(id), rc);
                        break;
                }
        }

        if (rc != 0) {
		lstcon_group_decref(tmp);
                return rc;
        }

        rc = lstcon_ndlist_stat(&tmp->grp_ndl_list, timeout, result_up);

	lstcon_group_decref(tmp);

        return rc;
}

static int
lstcon_debug_ndlist(struct list_head *ndlist,
		    struct list_head *translist,
		    int timeout, struct list_head __user *result_up)
{
	struct lstcon_rpc_trans *trans;
	int rc;

        rc = lstcon_rpc_trans_ndlist(ndlist, translist, LST_TRANS_SESQRY,
                                     NULL, lstcon_sesrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                return rc;
        }

        lstcon_rpc_trans_postwait(trans, LST_VALIDATE_TIMEOUT(timeout));

        rc = lstcon_rpc_trans_interpreter(trans, result_up,
                                          lstcon_sesrpc_readent);
        lstcon_rpc_trans_destroy(trans);

        return rc;
}

int
lstcon_session_debug(int timeout, struct list_head __user *result_up)
{
        return lstcon_debug_ndlist(&console_session.ses_ndl_list,
                                   NULL, timeout, result_up);
}

int
lstcon_batch_debug(int timeout, char *name,
		   int client, struct list_head __user *result_up)
{
	struct lstcon_batch *bat;
	int rc;

        rc = lstcon_batch_find(name, &bat);
        if (rc != 0)
                return -ENOENT;

        rc = lstcon_debug_ndlist(client ? &bat->bat_cli_list :
                                          &bat->bat_srv_list,
                                 NULL, timeout, result_up);

        return rc;
}

int
lstcon_group_debug(int timeout, char *name,
		   struct list_head __user *result_up)
{
	struct lstcon_group *grp;
	int rc;

        rc = lstcon_group_find(name, &grp);
        if (rc != 0)
                return -ENOENT;

        rc = lstcon_debug_ndlist(&grp->grp_ndl_list, NULL,
                                 timeout, result_up);
	lstcon_group_decref(grp);

        return rc;
}

int
lstcon_nodes_debug(int timeout, int count,
		   struct lnet_process_id __user *ids_up,
		   struct list_head __user *result_up)
{
	struct lnet_process_id id;
	struct lstcon_ndlink *ndl;
	struct lstcon_group *grp;
	int i;
	int rc;

        rc = lstcon_group_alloc(NULL, &grp);
        if (rc != 0) {
                CDEBUG(D_NET, "Out of memory\n");
                return rc;
        }

        for (i = 0; i < count; i++) {
		if (copy_from_user(&id, &ids_up[i], sizeof(id))) {
                        rc = -EFAULT;
                        break;
                }

                /* node is added to tmp group */
                rc = lstcon_group_ndlink_find(grp, id, &ndl, 1);
                if (rc != 0) {
                        CERROR("Can't create node link\n");
                        break;
                }
        }

        if (rc != 0) {
		lstcon_group_decref(grp);
                return rc;
        }

        rc = lstcon_debug_ndlist(&grp->grp_ndl_list, NULL,
                                 timeout, result_up);

	lstcon_group_decref(grp);

        return rc;
}

int
lstcon_session_match(struct lst_sid sid)
{
        return (console_session.ses_id.ses_nid   == sid.ses_nid &&
                console_session.ses_id.ses_stamp == sid.ses_stamp) ?  1: 0;
}

static void
lstcon_new_session_id(struct lst_sid *sid)
{
	struct lnet_process_id id;

	LASSERT(console_session.ses_state == LST_SESSION_NONE);

	LNetGetId(1, &id);
	sid->ses_nid = id.nid;
	sid->ses_stamp = div_u64(ktime_get_ns(), NSEC_PER_MSEC);
}

int
lstcon_session_new(char *name, int key, unsigned feats,
		   int timeout, int force, struct lst_sid __user *sid_up)
{
        int     rc = 0;
        int     i;

        if (console_session.ses_state != LST_SESSION_NONE) {
                /* session exists */
                if (!force) {
			CNETERR("Session %s already exists\n",
				console_session.ses_name);
                        return -EEXIST;
                }

                rc = lstcon_session_end();

                /* lstcon_session_end() only return local error */
                if  (rc != 0)
                        return rc;
        }

	if ((feats & ~LST_FEATS_MASK) != 0) {
		CNETERR("Unknown session features %x\n",
			(feats & ~LST_FEATS_MASK));
		return -EINVAL;
	}

	for (i = 0; i < LST_GLOBAL_HASHSIZE; i++)
		LASSERT(list_empty(&console_session.ses_ndl_hash[i]));

	lstcon_new_session_id(&console_session.ses_id);

	console_session.ses_key	    = key;
	console_session.ses_state   = LST_SESSION_ACTIVE;
	console_session.ses_force   = !!force;
	console_session.ses_features = feats;
	console_session.ses_feats_updated = 0;
	console_session.ses_timeout = (timeout <= 0) ?
				      LST_CONSOLE_TIMEOUT : timeout;

	if (strlen(name) > sizeof(console_session.ses_name)-1)
		return -E2BIG;
	strlcpy(console_session.ses_name, name,
		sizeof(console_session.ses_name));

        rc = lstcon_batch_add(LST_DEFAULT_BATCH);
        if (rc != 0)
                return rc;

        rc = lstcon_rpc_pinger_start();
        if (rc != 0) {
		struct lstcon_batch *bat = NULL;

                lstcon_batch_find(LST_DEFAULT_BATCH, &bat);
                lstcon_batch_destroy(bat);

                return rc;
        }

	if (copy_to_user(sid_up, &console_session.ses_id,
			     sizeof(struct lst_sid)) == 0)
                return rc;

        lstcon_session_end();

        return -EFAULT;
}

int
lstcon_session_info(struct lst_sid __user *sid_up, int __user *key_up,
		    unsigned __user *featp,
		    struct lstcon_ndlist_ent __user *ndinfo_up,
		    char __user *name_up, int len)
{
	struct lstcon_ndlist_ent *entp;
	struct lstcon_ndlink *ndl;
	int rc = 0;

        if (console_session.ses_state != LST_SESSION_ACTIVE)
                return -ESRCH;

        LIBCFS_ALLOC(entp, sizeof(*entp));
        if (entp == NULL)
                return -ENOMEM;

	list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link)
		LST_NODE_STATE_COUNTER(ndl->ndl_node, entp);

	if (copy_to_user(sid_up, &console_session.ses_id,
			 sizeof(struct lst_sid)) ||
	    copy_to_user(key_up, &console_session.ses_key,
			     sizeof(*key_up)) ||
	    copy_to_user(featp, &console_session.ses_features,
			     sizeof(*featp)) ||
	    copy_to_user(ndinfo_up, entp, sizeof(*entp)) ||
	    copy_to_user(name_up, console_session.ses_name, len))
                rc = -EFAULT;

        LIBCFS_FREE(entp, sizeof(*entp));

        return rc;
}

int
lstcon_session_end(void)
{
	struct lstcon_rpc_trans *trans;
	struct lstcon_group *grp;
	struct lstcon_batch *bat;
	int rc = 0;

        LASSERT (console_session.ses_state == LST_SESSION_ACTIVE);

	rc = lstcon_rpc_trans_ndlist(&console_session.ses_ndl_list,
                                     NULL, LST_TRANS_SESEND, NULL,
                                     lstcon_sesrpc_condition, &trans);
        if (rc != 0) {
                CERROR("Can't create transaction: %d\n", rc);
                return rc;
        }

        console_session.ses_shutdown = 1;

        lstcon_rpc_pinger_stop();

        lstcon_rpc_trans_postwait(trans, LST_TRANS_TIMEOUT);

        lstcon_rpc_trans_destroy(trans);
        /* User can do nothing even rpc failed, so go on */

        /* waiting for orphan rpcs to die */
        lstcon_rpc_cleanup_wait();

        console_session.ses_id    = LST_INVALID_SID;
        console_session.ses_state = LST_SESSION_NONE;
        console_session.ses_key   = 0;
        console_session.ses_force = 0;
	console_session.ses_feats_updated = 0;

	/* destroy all batches */
	while (!list_empty(&console_session.ses_bat_list)) {
		bat = list_entry(console_session.ses_bat_list.next,
				 struct lstcon_batch, bat_link);

		lstcon_batch_destroy(bat);
	}

	/* destroy all groups */
	while (!list_empty(&console_session.ses_grp_list)) {
		grp = list_entry(console_session.ses_grp_list.next,
				 struct lstcon_group, grp_link);
		LASSERT(grp->grp_ref == 1);

		lstcon_group_decref(grp);
	}

	/* all nodes should be released */
	LASSERT(list_empty(&console_session.ses_ndl_list));

	console_session.ses_shutdown = 0;
	console_session.ses_expired  = 0;

	return rc;
}

int
lstcon_session_feats_check(unsigned feats)
{
	int rc = 0;

	if ((feats & ~LST_FEATS_MASK) != 0) {
		CERROR("Can't support these features: %x\n",
		       (feats & ~LST_FEATS_MASK));
		return -EPROTO;
	}

	spin_lock(&console_session.ses_rpc_lock);

	if (!console_session.ses_feats_updated) {
		console_session.ses_feats_updated = 1;
		console_session.ses_features = feats;
	}

	if (console_session.ses_features != feats)
		rc = -EPROTO;

	spin_unlock(&console_session.ses_rpc_lock);

	if (rc != 0) {
		CERROR("remote features %x do not match with "
		       "session features %x of console\n",
		       feats, console_session.ses_features);
	}

	return rc;
}

static int
lstcon_acceptor_handle(struct srpc_server_rpc *rpc)
{
	struct srpc_msg *rep = &rpc->srpc_replymsg;
	struct srpc_msg *req = &rpc->srpc_reqstbuf->buf_msg;
	struct srpc_join_reqst *jreq = &req->msg_body.join_reqst;
	struct srpc_join_reply *jrep = &rep->msg_body.join_reply;
	struct lstcon_group *grp = NULL;
	struct lstcon_ndlink *ndl;
	int rc = 0;

        sfw_unpack_message(req);

	mutex_lock(&console_session.ses_mutex);

        jrep->join_sid = console_session.ses_id;

        if (console_session.ses_id.ses_nid == LNET_NID_ANY) {
                jrep->join_status = ESRCH;
                goto out;
        }

	if (lstcon_session_feats_check(req->msg_ses_feats) != 0) {
		jrep->join_status = EPROTO;
		goto out;
	}

        if (jreq->join_sid.ses_nid != LNET_NID_ANY &&
             !lstcon_session_match(jreq->join_sid)) {
                jrep->join_status = EBUSY;
                goto out;
        }

        if (lstcon_group_find(jreq->join_group, &grp) != 0) {
                rc = lstcon_group_alloc(jreq->join_group, &grp);
                if (rc != 0) {
                        CERROR("Out of memory\n");
                        goto out;
                }

		list_add_tail(&grp->grp_link,
			      &console_session.ses_grp_list);
		lstcon_group_addref(grp);
	}

        if (grp->grp_ref > 2) {
                /* Group in using */
                jrep->join_status = EBUSY;
                goto out;
        }

        rc = lstcon_group_ndlink_find(grp, rpc->srpc_peer, &ndl, 0);
        if (rc == 0) {
                jrep->join_status = EEXIST;
                goto out;
        }

        rc = lstcon_group_ndlink_find(grp, rpc->srpc_peer, &ndl, 1);
        if (rc != 0) {
                CERROR("Out of memory\n");
                goto out;
        }

        ndl->ndl_node->nd_state   = LST_NODE_ACTIVE;
        ndl->ndl_node->nd_timeout = console_session.ses_timeout;

        if (grp->grp_userland == 0)
                grp->grp_userland = 1;

	strlcpy(jrep->join_session, console_session.ses_name,
		sizeof(jrep->join_session));
        jrep->join_timeout = console_session.ses_timeout;
        jrep->join_status  = 0;

out:
	rep->msg_ses_feats = console_session.ses_features;
        if (grp != NULL)
		lstcon_group_decref(grp);

	mutex_unlock(&console_session.ses_mutex);

        return rc;
}

static struct srpc_service lstcon_acceptor_service;

static void lstcon_init_acceptor_service(void)
{
        /* initialize selftest console acceptor service table */
        lstcon_acceptor_service.sv_name    = "join session";
        lstcon_acceptor_service.sv_handler = lstcon_acceptor_handle;
        lstcon_acceptor_service.sv_id      = SRPC_SERVICE_JOIN;
	lstcon_acceptor_service.sv_wi_total = SFW_FRWK_WI_MAX;
}

static struct notifier_block lstcon_ioctl_handler = {
	.notifier_call = lstcon_ioctl_entry,
};

/* initialize console */
int
lstcon_console_init(void)
{
        int     i;
        int     rc;

	console_session.ses_id		    = LST_INVALID_SID;
	console_session.ses_state	    = LST_SESSION_NONE;
	console_session.ses_timeout	    = 0;
	console_session.ses_force	    = 0;
	console_session.ses_expired	    = 0;
	console_session.ses_feats_updated   = 0;
	console_session.ses_features	    = LST_FEATS_MASK;
	console_session.ses_laststamp = ktime_get_real_seconds();

	mutex_init(&console_session.ses_mutex);

	INIT_LIST_HEAD(&console_session.ses_ndl_list);
	INIT_LIST_HEAD(&console_session.ses_grp_list);
	INIT_LIST_HEAD(&console_session.ses_bat_list);
	INIT_LIST_HEAD(&console_session.ses_trans_list);

	CFS_ALLOC_PTR_ARRAY(console_session.ses_ndl_hash,
			       LST_GLOBAL_HASHSIZE);
	if (console_session.ses_ndl_hash == NULL)
		return -ENOMEM;

	for (i = 0; i < LST_GLOBAL_HASHSIZE; i++)
		INIT_LIST_HEAD(&console_session.ses_ndl_hash[i]);


        /* initialize acceptor service table */
        lstcon_init_acceptor_service();

	rc = srpc_add_service(&lstcon_acceptor_service);
	LASSERT(rc != -EBUSY);
	if (rc != 0) {
		CFS_FREE_PTR_ARRAY(console_session.ses_ndl_hash,
				   LST_GLOBAL_HASHSIZE);
		return rc;
	}

	rc = srpc_service_add_buffers(&lstcon_acceptor_service,
				      lstcon_acceptor_service.sv_wi_total);
	if (rc != 0) {
                rc = -ENOMEM;
                goto out;
        }

	rc = blocking_notifier_chain_register(&libcfs_ioctl_list,
					      &lstcon_ioctl_handler);
	if (rc == 0) {
		lstcon_rpc_module_init();
		return 0;
	}

out:
	srpc_shutdown_service(&lstcon_acceptor_service);
	srpc_remove_service(&lstcon_acceptor_service);

	CFS_FREE_PTR_ARRAY(console_session.ses_ndl_hash, LST_GLOBAL_HASHSIZE);

	srpc_wait_service_shutdown(&lstcon_acceptor_service);

	return rc;
}

int
lstcon_console_fini(void)
{
	int i;

	blocking_notifier_chain_unregister(&libcfs_ioctl_list,
					   &lstcon_ioctl_handler);

	mutex_lock(&console_session.ses_mutex);

        srpc_shutdown_service(&lstcon_acceptor_service);
        srpc_remove_service(&lstcon_acceptor_service);

        if (console_session.ses_state != LST_SESSION_NONE)
                lstcon_session_end();

        lstcon_rpc_module_fini();

	mutex_unlock(&console_session.ses_mutex);

	LASSERT(list_empty(&console_session.ses_ndl_list));
	LASSERT(list_empty(&console_session.ses_grp_list));
	LASSERT(list_empty(&console_session.ses_bat_list));
	LASSERT(list_empty(&console_session.ses_trans_list));

	for (i = 0; i < LST_NODE_HASHSIZE; i++)
		LASSERT(list_empty(&console_session.ses_ndl_hash[i]));

	CFS_FREE_PTR_ARRAY(console_session.ses_ndl_hash,
			   LST_GLOBAL_HASHSIZE);

	srpc_wait_service_shutdown(&lstcon_acceptor_service);

	return 0;
}

